package flink.eureka.connector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup

/**
  *
  * @author eureka.wh
  * @since 2019-06-11 09:24
  */
// Flink对接Kafka作为Source使用
object KafkaConnectorConsumerApp {

  def main(args: Array[String]): Unit = {

    println("哈哈哈~~~KafkaConnectorConsumerApp")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // checkpoint常用设置参数
    env.enableCheckpointing(4000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointTimeout(10000)
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    import org.apache.flink.api.scala._

    // 启动weihua01生产者生产数据
    val topic = "h1"
    val properties = new Properties()

    // hadoop000  必须要求你的idea这台机器的hostname和ip的映射关系必须要配置
    properties.setProperty("bootstrap.servers", "cm01:9092,cm02:9092,cm03:9092")
//    properties.setProperty("zookeeper.connect", "cm01:2181")
    properties.setProperty("group.id", "test")

    val sink = new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(), properties)

    val data = env.addSource(sink)
    data.print()

    env.execute("KafkaConnectorConsumerApp")
  }

}
